Introduction
Warning - it takes up to a minute to load . Pyspark is an invaluable tool to process datasets in parallel partitions for processing efficiency. The data can be written and merged to another file with many different options in format, such as csv, tsv, json, parquet, and xml.
The structured pipeline once written will always be reproducible and easy to maintain.
The Canadian Anti-Fraud Centre updates their dataset every quarter and maintains their dataset on the CKAN platform. The CKAN platform allows federal and muncipal governments as well as companies to maintain their catalog of datasets in a consistent and transparent way, whether it’s public or private to all their users.
In this exercise, we are going to use the Canadian Anti-Fraud Centre Reporting Dataset on the CKAN platform and do the following:
1. Extract english and french field names into 2 datasets to represent English and French distinct but replicated datasets (in progress).
2. Change date type to date and change fields to numeric that we want to aggregate.
3. Include monthly and yearly aggregate datasets.
4. Filter out invalid records that have invalid Country names.
5. Streamlined access to new generated datasets.
#| echo: true
#%run cafc_quarterly_fraud_data_pipeline.py #Set up on cronjob to run each quarter
This outputs the script used in pyspark to clean the data downloaded from:
# get path to script
pathway <- here:: here ("posts" ,"pyspark" ,"cafc_quarterly_fraud_data_pipeline.py" )
pathway
[1] "/Users/Eileen/Desktop/GoData/Blog/posts/pyspark/cafc_quarterly_fraud_data_pipeline.py"
# generate output of lines from script
lines <- readLines (pathway, warn= FALSE )
cat (lines, sep = " \n " )
import os
#import pandas as pd
#import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, to_date, trim, when, year, month, sum as _sum
#from datetime import datetime
#from reportlab.lib.pagesizes import letter
#from reportlab.lib import colors
#from reportlab.pdfgen import canvas
# ------------ 1: START SPARK SESSION ------------
spark = SparkSession.builder.appName("CAFC_Quarterly_Refinery_Pipeline").getOrCreate()
# ------------ 2: READ & CLEAN DATA ------------
df = spark.read.option("header", "true").option("delimiter", "\t").csv("cafc.tsv")
french_cols = [
"Type de plainte recue", "Pays", "Province/Etat",
"Categories thematiques sur la fraude et la cybercriminalite",
"Methode de sollicitation", "Genre",
"Langue de correspondance", "Type de plainte"
]
df = df.drop(*french_cols)
rename_map = {
"Numero d'identification / Number ID": "id",
"Date Received / Date recue": "date",
"Complaint Received Type": "complaint_type",
"Country": "country", "Province/State": "province",
"Fraud and Cybercrime Thematic Categories": "fraud_cat",
"Solicitation Method": "sol_method",
"Gender": "gender", "Language of Correspondence": "lang_cor",
"Victim Age Range / Tranche d'age des victimes": "age_range",
"Complaint Type": "complaint_subtype",
"Number of Victims / Nombre de victimes": "num_victims",
"Dollar Loss /pertes financieres": "dollar_loss"
}
for old, new in rename_map.items():
if old in df.columns:
df = df.withColumnRenamed(old, new)
df = df.withColumn("num_victims",
when(trim(col("num_victims")) == "", None).otherwise(col("num_victims").cast("integer")))
df = df.withColumn("dollar_loss",
when(trim(col("dollar_loss")) == "", None)
.otherwise(regexp_replace(col("dollar_loss"), "[$,]", "").cast("double")))
df = df.withColumn("date", to_date("date", "yyyy-MM-dd"))
# clean blank records
df = df.filter(df.country != "Not Specified")
#.show(truncate=False)
# ------------ 3: DETECT QUARTER ------------
min_date = df.agg({"date": "min"}).collect()[0][0]
max_date = df.agg({"date": "max"}).collect()[0][0]
year_val = max_date.year
quarter_val = (max_date.month - 1) // 3 + 1
label = f"{year_val}_Q{quarter_val}"
out_dir = f"outputs_{label}"
os.makedirs(out_dir, exist_ok=True)
# ------------ 4: SAVE CLEANED DATA ------------
df.write.mode("overwrite").option("header", True).csv(f"{out_dir}/cleaned_cafc")
# ------------ 5: SUMMARIES ------------
monthly_summary = df.groupBy(year("date").alias("year"), month("date").alias("month")).agg(
_sum("dollar_loss").alias("total_loss"),
_sum("num_victims").alias("total_victims")
).orderBy("year", "month")
monthly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/monthly_summary")
yearly_summary = df.groupBy(year("date").alias("year")).agg(
_sum("dollar_loss").alias("total_loss"),
_sum("num_victims").alias("total_victims")
).orderBy("year")
yearly_summary.write.mode("overwrite").option("header", True).csv(f"{out_dir}/yearly_summary")
print(f"CAFC_Quarterly_Refinery_Pipeline script complete: {out_dir}")
Required Libraries
# Load necessary libraries
# install.packages("tidyverse")
suppressPackageStartupMessages (library (tidyverse))
This is where the new files will be stored
# Set the directory where your partitioned CSV files are located
data_directory <- "outputs_2025_Q2/cleaned_cafc"
data_directory
[1] "outputs_2025_Q2/cleaned_cafc"
When files are outputed there will be several files from different partitions
# Get a list of all CSV file paths
file_paths <- list.files (path = data_directory, pattern = " \\ .csv$" , all.files = TRUE , full.names = TRUE )
file_paths
[1] "outputs_2025_Q2/cleaned_cafc/part-00000-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[2] "outputs_2025_Q2/cleaned_cafc/part-00001-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[3] "outputs_2025_Q2/cleaned_cafc/part-00002-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[4] "outputs_2025_Q2/cleaned_cafc/part-00003-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[5] "outputs_2025_Q2/cleaned_cafc/part-00004-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[6] "outputs_2025_Q2/cleaned_cafc/part-00005-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[7] "outputs_2025_Q2/cleaned_cafc/part-00006-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
[8] "outputs_2025_Q2/cleaned_cafc/part-00007-b9ce166a-69b6-41ae-b68c-17dc6f4c3e5f-c000.csv"
Merge the partitions to one file
# Read and combine all CSV files into a single data frame
merged_df <- file_paths %>%
map_dfr (read_csv, show_col_types = FALSE )
Show records of merged file
# View the first few rows of the merged data frame
head (merged_df)
# A tibble: 6 × 13
id date complaint_type country province fraud_cat sol_method gender
<dbl> <date> <chr> <chr> <chr> <chr> <chr> <chr>
1 2 2021-01-02 CAFC Website Canada Quebec Merchand… Internet Male
2 4 2021-01-02 CAFC Website Canada British C… Vendor F… Text mess… Male
3 5 2021-01-02 NCFRS Canada Alberta Merchand… Internet Female
4 6 2021-01-02 CAFC Website Canada Ontario Phishing Text mess… Male
5 8 2021-01-02 CAFC Website Canada Saskatche… Merchand… Other/unk… Not A…
6 9 2021-01-02 CAFC Website Canada British C… Job Internet-… Female
# ℹ 5 more variables: lang_cor <chr>, age_range <chr>, complaint_subtype <chr>,
# num_victims <dbl>, dollar_loss <dbl>
data_directory <- "outputs_2025_Q2/monthly_summary"
file_paths <- list.files (path = data_directory, pattern = " \\ .csv$" , all.files = TRUE , full.names = TRUE )
file_paths
[1] "outputs_2025_Q2/monthly_summary/part-00000-bb97576f-ce4a-400a-91ef-d8af632d0b57-c000.csv"
monthly_summary_df <- file_paths %>%
map_dfr (read_csv, show_col_types = FALSE )
# A tibble: 6 × 4
year month total_loss total_victims
<dbl> <dbl> <dbl> <dbl>
1 2021 1 12894120. 4411
2 2021 2 27042043. 5055
3 2021 3 17017938. 6111
4 2021 4 22120277. 4711
5 2021 5 19401052. 4348
6 2021 6 19796861. 4633
data_directory <- "outputs_2025_Q2/yearly_summary"
file_paths <- list.files (path = data_directory, pattern = " \\ .csv$" , all.files = TRUE , full.names = TRUE )
file_paths
[1] "outputs_2025_Q2/yearly_summary/part-00000-1ce6d64d-3176-4cbd-8fb9-7dd33486ba65-c000.csv"
yearly_summary_df <- file_paths %>%
map_dfr (read_csv, show_col_types = FALSE )
# A tibble: 5 × 3
year total_loss total_victims
<dbl> <dbl> <dbl>
1 2021 309470307. 52830
2 2022 444393290. 47537
3 2023 497598850. 35714
4 2024 527081208. 30241
5 2025 270106027. 14181
Deployment
library (downloadthis)
merged_df %>%
download_this (
output_name = "clean_cafc_en.csv" ,
output_extension = ".csv" ,
button_label = "Download clean_cafc_en.csv" ,
button_type = "default" ,
self_contained = TRUE ,
has_icon = TRUE ,
icon = "fa fa-save" ,
id = "cafc-btn"
)